import com.tplhk.thread.completeableFuture.service.MedalInfo;
import com.tplhk.thread.completeableFuture.service.MedalService;
import com.tplhk.thread.completeableFuture.service.UserInfo;
import com.tplhk.thread.completeableFuture.service.UserInfoService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @ClassName : FutureLesson1
 * @Description : 一个例子走进CompletableFuture
 * @Author : jqxu
 * @Date: 2022/4/27 9:11
 **/
@Slf4j
@SpringBootTest(classes = {FutureLesson1.class})
public class FutureLesson1 {

    /**
     * 1. 一个入门的例子
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void simpleExample() throws InterruptedException, TimeoutException, ExecutionException {
        UserInfoService userInfoService = new UserInfoService();
        MedalService medalService = new MedalService();
        String userId = "666";
        long startTime = System.currentTimeMillis();

        //调用用户服务获取用户基本信息
        CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));

        Thread.sleep(300); //模拟主线程其它操作耗时

        CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId));

        //下面是等待2秒，如果没有返回，报 TimeoutException
        UserInfo userInfo = completableUserInfoFuture.get(2, TimeUnit.SECONDS);//获取个人信息结果
        // 下面是一直等待，相当于阻塞
        MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果

        log.info(":::userinfo = {}", userInfo.toString());
        log.info(":::medalInfo = {}", medalInfo.toString());
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");

    }

    /**
     * 2. 创建异步任务
     * supplyAsync
     * 执行CompletableFuture任务，支持返回值
     * 使用默认内置线程池ForkJoinPool.commonPool() 执行任务，或者自定义线程执行任务
     * runAsync
     * 执行CompletableFuture任务，没有返回值。
     * <p>
     * get() 方法需要抛出 InterruptedException,  ExecutionException 异常，join() 方法不用抛异
     *
     * 如果异步任务抛出异常，是看不到异常的，需要调用 get()/join() 方法获取异常信息 并且中止运行
     * 建议 try…catch…处理异常 或者使用 exceptionally 或者使用 handle() 方法处理异常，。
     *
     * 默认线程池的注意点
     * 默认的线程池，处理的线程个数是电脑CPU核数-1。在大量请求过来的时候，处理逻辑复杂的话，响应会很慢。
     * 一般建议使用自定义线程池，优化线程池配置参数。
     * 拒绝策略建议，最好使用AbortPolicy，然后耗时的异步线程，做好线程池隔离!
     * 线程池隔离实现：自定义多个线程池，设置线程名称，隔离不同业务线程池。
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException   本例返回内容：
     *                              run,关注公众号:捡田螺的小男孩
     *                              null
     *                              supply,关注公众号:捡田螺的小男孩捡田螺的小男孩
     */
    @Test
    public void supplyAsyncAndRunAsync() {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : {}", LocalDateTime.now(), "run,关注公众号:捡田螺的小男孩");
        }, executor);

        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : {}", LocalDateTime.now(), "supply,关注公众号:捡田螺的小男孩");
            return "捡田螺的小男孩";
        }, executor);
        //runAsync的future没有返回值，输出null， join() 是阻塞的
        log.info("{} : {}", LocalDateTime.now(), runFuture.join());
        //supplyAsync的future，有返回值， join() 是阻塞的
        log.info("{} : {}", LocalDateTime.now(), supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭

    }

    /**
     * 3.1 变换 thenRun/thenRunAsync
     * 做完第一个任务后，再做第二个任务
     * 无输入无输出
     * 这两个区别是 thenRunAsync 可以指定自定义线程池,其它 *Async 结尾的也是相同的逻辑
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void thenRunAndthenRunAsync() {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        ExecutorService executor = Executors.newCachedThreadPool();
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(3 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "先执行第一个CompletableFuture方法任务");
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : {}", LocalDateTime.now(), "接着执行第二个任务");
        });
        log.info("{} : {}", LocalDateTime.now(), thenRunFuture.join());
    }


    /**
     * 3.2 变换 thenAccept/thenAcceptAsync
     * 做完第一个任务后，再做第二个任务
     * 有输入无输出
     * 这两个区别是 thenAcceptAsync 可以指定自定义线程池,其它 *Async 结尾的也是相同的逻辑
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void thenAcceptAndthenAcceptAsync() {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "先执行第一个CompletableFuture方法任务");
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
            log.info("{} : {}", LocalDateTime.now(), a);
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : {}", LocalDateTime.now(), "接着执行第二个任务");
        });
        log.info("{} : {}", LocalDateTime.now(), thenAcceptFuture.join());
    }

    /**
     * 3.3 变换 thenApply/thenApplyAsync
     * 做完第一个任务后，再做第二个任务
     * 有输入有输出
     * 这两个区别是 thenApplyAsync 可以指定自定义线程池,其它 *Async 结尾的也是相同的逻辑
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void thenApplyAndthenApplyAsync() {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(20 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "先执行第一个CompletableFuture方法任务");
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture<String> thenApplyAsyncFuture = orgFuture.thenApplyAsync((a) -> {
            log.info("{} : {}", LocalDateTime.now(), a);
            try {
                Thread.sleep(20 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : {}", LocalDateTime.now(), "接着执行第二个任务");
            return "第二个任务完成";
        });
        log.info("{} : {}", LocalDateTime.now(), thenApplyAsyncFuture.join());
    }


    /**
     * 3.4 变换 thenCompose
     * 返回一个 CompletableFuture 或者其他对象。
     * 有输入有输出
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void thenCompose() {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> first = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "先执行第一个CompletableFuture方法任务");
                    return null;
                }
        );

        CompletableFuture<String> second = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "执行第二个CompletableFuture方法任务");
                    return "捡田螺的小男孩second";
                }
        );

        CompletableFuture<String> third = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "执行第三个CompletableFuture方法任务");
                    return "捡田螺的小男孩third";
                }
        );

        CompletableFuture<String> thenComposeFuture = first.thenCompose((a) -> {
            log.info("{} : {}", LocalDateTime.now(), a);
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : {}", LocalDateTime.now(), "执行thenComposeFuture任务");
            return third;
        });
        log.info("{} : {}", LocalDateTime.now(), thenComposeFuture.join());

    }


    /**
     * 4.1 消费异常 exceptionally
     * 消费异常: 一般配置whenComplete() 使用。异常必须抛出，否则 exceptionally 无效
     * 有输入(异常)有输出
     */
    @Test
    public void exceptionally() {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 异常必须抛出，否则 exceptionally 无效
                    int i = 1 / 0;
                    log.info("{} : {}", LocalDateTime.now(), "运算异常: 1/0");
                    return "捡田螺的小男孩";
                }
        );

        CompletableFuture<String> exceptionallyFuture = orgFuture.exceptionally((a) -> {
            log.info("{} : {}", LocalDateTime.now(), a.getMessage());
            try {
                Thread.sleep(1 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "你的程序异常啦";
        });
        log.info("{} : {}", LocalDateTime.now(), exceptionallyFuture.join());
    }

    /**
     * 4.2 消费结果 whenComplete
     * 消费结果: whenComplete 本身没有返回值，但是 get() 得到的是上一个任务的结果
     * 有输入(异常)无输出
     */
    @Test
    public void whenComplete() throws ExecutionException, InterruptedException {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "有可能运算异常: 1/0");
                    if (new Random().nextInt(10) % 2 == 0) {
                        log.info("抛了个运算异常");
                        int i = 12 / 0;
                    }
                    return "捡田螺的小男孩";
                }
        );


        /**
         * 不管 orgFuture 是否运算异常，下面代码一定执行。如果 exceptionally 会执行，那么 whenComplete 和 exceptionally 同时执行
         */
        orgFuture.whenComplete((a, e) -> {
            a = a + "!!!!";
            log.info("whenComplete:::{} : {}", LocalDateTime.now(), a);
            try {
                Thread.sleep(3 * 1000);
            } catch (InterruptedException e1) {
                log.info("{} : {}", LocalDateTime.now(), e1.getMessage());

            }
        });

        /**
         * orgFuture 运算异常，下面代码才会执行。如果 exceptionally 会执行，那么 whenComplete 和 exceptionally 同时执行
         */
        orgFuture.exceptionally((e) -> {
            log.info("exceptionally:::{} : {}", LocalDateTime.now(), e.getMessage());
            try {
                Thread.sleep(1 * 1000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            return "你的程序异常啦";
        });

        log.info("主程序:::::{} : {}", LocalDateTime.now(), orgFuture.get());
    }

    /**
     * 4.3 消费 handle
     * handle: 相当于 whenComplete + exceptionally.
     * 如果抛了异常， handle(a, e) 接收的参数为 a = null , e有值 ；
     * 如果正常， handle(a, e) 接收的参数为 a = 有值 , e = null ；
     */
    @Test
    public void handle() throws ExecutionException, InterruptedException {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "有可能运算异常: 1/0");
                    if (new Random().nextInt(10) % 2 == 0) {
                        log.info("抛了个运算异常");
                        int i = 12 / 0;
                    }
                    return "捡田螺的小男孩";
                }
        );


        /**
         * 不管 orgFuture 是否运算异常，下面代码一定执行。如果 exceptionally 会执行，那么 whenComplete 和 exceptionally 同时执行
         */
        CompletableFuture<String> handle = orgFuture.handle((a, e) -> {
            a = a + "!!!!";
            log.info("handle:::{} : {}", LocalDateTime.now(), a);
            log.info("handle:::{} : {}", LocalDateTime.now(), e);
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e1) {
                log.info("{} : {}", LocalDateTime.now(), e1.getMessage());

            }
            return "handle 返回结果啦";
        });

        log.info("主程序:::::{} : {}", LocalDateTime.now(), handle.get());
    }


    /**
     * 5.1 组合 and
     * 触发条件：两个任务执行完
     * runAfterBoth/runAfterBothAsync: 无输入无输出
     * thenAcceptBoth/thenAcceptBothAsync: 有输入无输出
     * thenCombine/thenCombineAsync: 有输入有输出
     */
    @Test
    public void thenCombine() throws ExecutionException, InterruptedException {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> first = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(2 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "first......");
                    return "捡田螺的小男孩first";
                }
        ).thenApply((a) -> {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("{} : a:{}, {}", LocalDateTime.now(), a, "first+20s......");
            return "捡田螺的小男孩first";
        });

        CompletableFuture<String> second = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(3 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "second......");
                    return "捡田螺的小男孩second";
                }
        );

        CompletableFuture<String> completableFuture = second.thenCombineAsync(first, (f, s) -> {
            log.info("{} : {}", LocalDateTime.now(), "completableFuture......");
            log.info("f = {} : {}", LocalDateTime.now(), f);
            log.info("s = {} : {}", LocalDateTime.now(), s);
            return "两个异步任务的组合执行完了";
        });

        log.info("主程序:::::{} : {}", LocalDateTime.now(), completableFuture.get());

    }


    /**
     * 5.2 组合 or
     * 触发条件：两个任务完成一个 applyToEither / acceptEither / runAfterEither
     * runAfterEither/runAfterEitherAsync: 无输入无输出
     * acceptEither/acceptEitherAsync: 有输入无输出
     * applyToEither/applyToEitherAsync: 有输入有输出
     */
    @Test
    public void applyToEither() throws ExecutionException, InterruptedException {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> first = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(20 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "first......");
                    return "捡田螺的小男孩first";
                }
        );

        CompletableFuture<String> second = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(30 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "second......");
                    return "捡田螺的小男孩second";
                }
        );

        CompletableFuture<String> applyToEitherFuture = second.applyToEither(first, (f) -> {
            log.info("{} : {}", LocalDateTime.now(), "applyToEitherFuture......");
            log.info("f = {} : {}", LocalDateTime.now(), f);
            return "其中一个异步任务的组合执行完了";
        });

        log.info("主程序:::::{} : {}", LocalDateTime.now(), applyToEitherFuture.get());
    }

    /**
     * 5.3 组合 allof
     * 触发条件：所有任务全部执行完
     */
    @Test
    public void allof() throws ExecutionException, InterruptedException {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> first = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "first......");
                    return "捡田螺的小男孩first";
                }
        );

        CompletableFuture<String> second = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(15 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "second......");
                    return "捡田螺的小男孩second";
                }
        );

        CompletableFuture<String> thirth = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(20 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
//                    int i = 1/0;
                    log.info("{} : {}", LocalDateTime.now(), "thirth......");
                    return "捡田螺的小男孩thirth";
                }
        );

        CompletableFuture<String> allofFuture = CompletableFuture.allOf(first, second, thirth).handle((a, e) -> {
            log.info("handle:::{} : {}", LocalDateTime.now(), a);
            log.info("handle:::{} : {}", LocalDateTime.now(), e);
            if (e != null) {
                return "有任务出错";
            }
            return "所有任务全部执行完";
        });

        log.info("主程序:::::{} : {}", LocalDateTime.now(), allofFuture.get());
    }

    /**
     * 5.4 组合 anyOf
     * 触发条件：任何一个任务执行完
     */
    @Test
    public void anyOf() throws ExecutionException, InterruptedException {
        log.info("{} : {}", LocalDateTime.now(), "开始执行");
        CompletableFuture<String> first = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "first......");
                    return "捡田螺的小男孩first";
                }
        );

        CompletableFuture<String> second = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(15 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("{} : {}", LocalDateTime.now(), "second......");
                    return "捡田螺的小男孩second";
                }
        );

        CompletableFuture<String> thirth = CompletableFuture.supplyAsync(
                () -> {
                    try {
                        Thread.sleep(20 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
//                    int i = 1/0;
                    log.info("{} : {}", LocalDateTime.now(), "thirth......");
                    return "捡田螺的小男孩thirth";
                }
        );

        CompletableFuture<String> anyOfFuture = CompletableFuture.anyOf(first, second, thirth).handle((a, e) -> {
            log.info("handle:::{} : {}", LocalDateTime.now(), a);
            log.info("handle:::{} : {}", LocalDateTime.now(), e);
            if (e != null) {
                return "有任务出错";
            }
            return "所有任务全部执行完";
        });

        log.info("主程序:::::{} : {}", LocalDateTime.now(), anyOfFuture.get());
    }

}
